/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/******************************************************************************
 * $Id: fifo.h 9655 2013-06-25 23:08:13Z xlou $
 *
 *****************************************************************************/

/** @file
 * class XXX interface
 */

#pragma once

#include <vector>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <stdexcept>
#include "elementtype.h"
#include "datalistimpl.h"

namespace joblist
{
/** @brief class FIFO
 *
 */

/* This derives from DataListImpl<vector<element_t> > but it manages its own data */
template <typename element_t>
class FIFO : public DataListImpl<std::vector<element_t>, element_t>
{
 private:
  using base = DataListImpl<std::vector<element_t>, element_t>;

 public:
  enum ElementMode
  {
    RID_ONLY,
    RID_VALUE
  };

  FIFO(uint32_t numConsumers, uint32_t maxElements);
  ~FIFO() override;

  /* DataList<element_t> interface */
  inline void insert(const element_t& e) override;
  inline void insert(const std::vector<element_t>& v) override;
  inline bool next(uint64_t it, element_t* e) override;
  uint64_t getIterator() override;
  void endOfInput() override;
  void setMultipleProducers(bool b) override;

  /* Use this insert() to detect when insertion fills up buffer.      */
  /* When this happens, call waitTillReadyForInserts() before resuming*/
  /* with more inserts.                                               */
  inline void insert(const element_t& e, bool& bufferFullBlocked, bool& consumptionStarted);
  inline void waitTillReadyForInserts();
  inline bool isOutputBlocked() const;

  void OID(execplan::CalpontSystemCatalog::OID oid) override
  {
    base::OID(oid);
  }
  execplan::CalpontSystemCatalog::OID OID() const override
  {
    return base::OID();
  }

  inline void dropToken() {};
  inline void dropToken(uint32_t){};

  // Counters that reflect how many many times this FIFO blocked on reads/writes
  uint64_t blockedWriteCount() const;
  uint64_t blockedReadCount() const;

  // @bug 653 set number of consumers when it is empty.
  void setNumConsumers(uint32_t nc) override;

  void inOrder(bool order)
  {
    fInOrder = order;
  }
  bool inOrder() const
  {
    return fInOrder;
  }

  // Default behavior tracks number of element inserts.  If application
  // is inserting complex elements (ex: RowWrappers), then application
  // should call totalSize() mutator to record accurate element count.
  // This is not a blocking call.  If totalSize() accessor is called
  // prior to completion, the "current" total size will be returned.
  void totalSize(const uint64_t totSize)
  {
    fTotSize = totSize;
  }
  uint64_t totalSize() override
  {
    return fTotSize;
  }

  void maxElements(uint64_t max);
  uint64_t maxElements()
  {
    return fMaxElements;
  }

  // FIFO only uses elementmode to control how the elements are saved
  // to temp disk, should the application choose to page to disk.  If
  // a FIFO is converted to another datalist, this enum should be copied
  // over to the datalist, as a ZDL for example can use the element mode
  // for other reasons.
  void setElementMode(uint32_t mode)
  {
    fElementMode = mode;
  }
  uint32_t getElementMode() const
  {
    return fElementMode;
  }

  // Total number of files and filespace used for temp files
  void setTotalFileCounts(uint64_t numFiles, uint64_t numBytes);
  void totalFileCounts(uint64_t& numFiles, uint64_t& numBytes) const;

  // returns true if there might be more data to read,
  // false if there is no more data.  Similar to next(), but
  // does not return data.
  bool more(uint64_t id);

 protected:
 private:
  boost::condition finishedConsuming, moreData;

  element_t* pBuffer;
  element_t* cBuffer;
  uint64_t ppos;
  uint64_t* cpos;
  uint64_t cDone;
  uint64_t fMaxElements;
  uint64_t cWaiting;
  uint64_t fTotSize;
  bool fInOrder;
  uint64_t fConsumerFinishedCount;
  volatile bool fConsumptionStarted;
  uint32_t fElementMode;
  uint64_t fNumFiles;
  uint64_t fNumBytes;

  // Counters that reflect how many many times this FIFO blocked
  // on reads and writes due to the FIFO being empty or full.
  uint64_t blockedInsertWriteCount;
  uint64_t blockedNextReadCount;

  FIFO& operator=(const FIFO&);
  FIFO(const FIFO&);
  FIFO();

  void signalPs();
  bool swapBuffers(bool waitIfBlocked = true);
  bool waitForSwap(uint64_t id);
};

// #define FIFO_DEBUG

// toggles consumer behavior st it only has one critical section.
// Need to bench both ways before changing it.
// #define ONE_CS

template <typename element_t>
FIFO<element_t>::FIFO(uint32_t con, uint32_t max) : DataListImpl<std::vector<element_t>, element_t>(con)
{
  fMaxElements = max;
  pBuffer = 0;
  cBuffer = 0;
  cpos = new uint64_t[con];
  ppos = 0;
  cWaiting = 0;
  fTotSize = 0;
  fInOrder = false;
  fConsumerFinishedCount = 0;
  fConsumptionStarted = false;
  fElementMode = RID_ONLY;
  fNumFiles = 0;
  fNumBytes = 0;

  for (uint64_t i = 0; i < con; ++i)
    cpos[i] = fMaxElements;

  cDone = con;

  blockedInsertWriteCount = blockedNextReadCount = 0;
}

template <typename element_t>
FIFO<element_t>::FIFO()
{
  throw std::logic_error("don't use FIFO()");
}

template <typename element_t>
FIFO<element_t>::FIFO(const FIFO<element_t>& f)
{
  throw std::logic_error("don't use FIFO(FIFO &)");
}

template <typename element_t>
FIFO<element_t>& FIFO<element_t>::operator=(const FIFO<element_t>& f)
{
  throw std::logic_error("don't use FIFO:: =");
}

template <typename element_t>
FIFO<element_t>::~FIFO()
{
  if (pBuffer)
    delete[] pBuffer;

  if (cBuffer)
    delete[] cBuffer;

  delete[] cpos;
}

// if waitIfBlocked is false, the return value indicates if the swap blocked
template <typename element_t>
bool FIFO<element_t>::swapBuffers(bool waitIfBlocked)
{
  element_t* tmp;

  boost::mutex::scoped_lock scoped(base::mutex);

  if (cDone < base::numConsumers)
  {
    blockedInsertWriteCount++;

    if (!waitIfBlocked)
      return true;

    while (cDone < base::numConsumers)
      finishedConsuming.wait(scoped);
  }

  tmp = pBuffer;
  pBuffer = cBuffer;
  cBuffer = tmp;
  cDone = 0;
  ppos = 0;
  memset(cpos, 0, sizeof(*cpos) * base::numConsumers);

  if (cWaiting)
  {
    moreData.notify_all();
    cWaiting = 0;
  }

  return false;
}

template <typename element_t>
inline void FIFO<element_t>::insert(const element_t& e)
{
  if (!pBuffer)
  {
    pBuffer = new element_t[fMaxElements];
    cBuffer = new element_t[fMaxElements];
  }

  pBuffer[ppos++] = e;
  fTotSize++;

  if (ppos == fMaxElements)
    swapBuffers();
}

// version of insert that will return rather than block if insert would block
template <typename element_t>
inline void FIFO<element_t>::insert(const element_t& e, bool& bufferFullBlocked, bool& consumptionStarted)
{
  if (!pBuffer)
  {
    pBuffer = new element_t[fMaxElements];
    cBuffer = new element_t[fMaxElements];
  }

  pBuffer[ppos++] = e;
  fTotSize++;

  bufferFullBlocked = false;
  consumptionStarted = fConsumptionStarted;

  if (ppos == fMaxElements)
    bufferFullBlocked = swapBuffers(false);
}

template <typename element_t>
inline void FIFO<element_t>::waitTillReadyForInserts()
{
  if (ppos == fMaxElements)
    swapBuffers();
}

template <typename element_t>
inline bool FIFO<element_t>::isOutputBlocked() const
{
  if (ppos == fMaxElements)
    return true;
  else
    return false;
}

template <typename element_t>
inline void FIFO<element_t>::insert(const std::vector<element_t>& e)
{
  typename std::vector<element_t>::const_iterator it = e.begin();
  typename std::vector<element_t>::const_iterator end = e.end();

  while (it != end)
  {
    insert(*it);
    ++it;
  }
}

template <typename element_t>
bool FIFO<element_t>::waitForSwap(uint64_t id)
{
  boost::mutex::scoped_lock scoped(base::mutex);

#ifdef ONE_CS

  if (cpos[id] == fMaxElements)
    if (++cDone == base::numConsumers)
      finishedConsuming.notify_all()
#endif
          while (cpos[id] == fMaxElements && !base::noMoreInput)
      {
        ++cWaiting;
        blockedNextReadCount++;
        moreData.wait(scoped);
      }

  if (cpos[id] == fMaxElements)
  {
    // Before we free the lock, let's check to see if all our consumers
    // are finished, in which case we can delete our data buffers.
    if (++fConsumerFinishedCount == base::numConsumers)
    {
      delete[] pBuffer;
      delete[] cBuffer;
      pBuffer = 0;
      cBuffer = 0;
    }

    return false;
  }

  return true;
}

template <typename element_t>
bool FIFO<element_t>::more(uint64_t id)
{
  boost::mutex::scoped_lock scoped(base::mutex);
  return !(cpos[id] == fMaxElements && base::noMoreInput);
}

template <typename element_t>
void FIFO<element_t>::signalPs()
{
  boost::mutex::scoped_lock scoped(base::mutex);

  if (++cDone == base::numConsumers)
    finishedConsuming.notify_all();
}

template <typename element_t>
inline bool FIFO<element_t>::next(uint64_t id, element_t* out)
{
  base::mutex.lock();
  fConsumptionStarted = true;

  if (cpos[id] >= fMaxElements)
  {
    base::mutex.unlock();
    if (!waitForSwap(id))
      return false;
    base::mutex.lock();
  }

  *out = cBuffer[cpos[id]++];

#ifndef ONE_CS

  if (cpos[id] == fMaxElements)
  {
    base::mutex.unlock();
    signalPs();
    return true;
  }
#endif
  base::mutex.unlock();
  return true;
}

template <typename element_t>
void FIFO<element_t>::endOfInput()
{
  element_t* tmp;

  boost::mutex::scoped_lock scoped(base::mutex);

  if (ppos != 0)
  {
    while (cDone < base::numConsumers)
      finishedConsuming.wait(scoped);

    fMaxElements = ppos;
    tmp = pBuffer;
    pBuffer = cBuffer;
    cBuffer = tmp;
    cDone = 0;
    memset(cpos, 0, sizeof(*cpos) * base::numConsumers);
  }

  base::endOfInput();

  if (cWaiting)
    moreData.notify_all();
}

template <typename element_t>
uint64_t FIFO<element_t>::getIterator()
{
  uint64_t ret;

  boost::mutex::scoped_lock scoped(base::mutex);
  ret = base::getIterator();
  return ret;
}

template <typename element_t>
uint64_t FIFO<element_t>::blockedWriteCount() const
{
  return blockedInsertWriteCount;
}

template <typename element_t>
uint64_t FIFO<element_t>::blockedReadCount() const
{
  return blockedNextReadCount;
}

template <typename element_t>
void FIFO<element_t>::setMultipleProducers(bool b)
{
  if (b)
    throw std::logic_error("FIFO: setMultipleProducers() doesn't work yet");
}

//@bug 653
template <typename element_t>
void FIFO<element_t>::setNumConsumers(uint32_t nc)
{
  delete[] cpos;
  base::setNumConsumers(nc);
  cpos = new uint64_t[nc];

  for (uint64_t i = 0; i < nc; ++i)
    cpos[i] = fMaxElements;

  cDone = nc;
}

//@bug 864
template <typename element_t>
void FIFO<element_t>::maxElements(uint64_t max)
{
  if (fMaxElements != max)
  {
    fMaxElements = max;

    delete[] pBuffer;
    delete[] cBuffer;

    pBuffer = nullptr;
    cBuffer = nullptr;

    for (uint64_t i = 0; i < base::numConsumers; ++i)
      cpos[i] = fMaxElements;
  }
}

//
// Sets/Returns the number of temp files and the space taken up by those files
// (in bytes) by this FIFO collection, if the application code chose to cache
// to disk.
//
template <typename element_t>
void FIFO<element_t>::setTotalFileCounts(uint64_t numFiles, uint64_t numBytes)
{
  fNumFiles = numFiles;
  fNumBytes = numBytes;
}
template <typename element_t>
void FIFO<element_t>::totalFileCounts(uint64_t& numFiles, uint64_t& numBytes) const
{
  numFiles = fNumFiles;
  numBytes = fNumBytes;
}

}  // namespace joblist
